/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.type.apply.batch.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Stream;

import org.apache.commons.collections4.CollectionUtils;
import org.unidata.mdm.data.context.DataContextFlags;
import org.unidata.mdm.data.context.MergeFromRelationRequestContext;
import org.unidata.mdm.data.context.MergeRelationsRequestContext;
import org.unidata.mdm.data.context.MergeToRelationRequestContext;
import org.unidata.mdm.data.dto.MergeRelationsDTO;
import org.unidata.mdm.data.po.data.RelationEtalonRemapFromPO;
import org.unidata.mdm.data.po.data.RelationEtalonRemapToPO;
import org.unidata.mdm.data.po.data.RelationOriginRemapPO;
import org.unidata.mdm.data.service.segments.relations.batch.RelationsMergeStartExecutor;
import org.unidata.mdm.data.type.apply.RelationMergeChangeSet;
import org.unidata.mdm.data.util.StorageUtils;
import org.unidata.mdm.system.type.batch.BatchIterator;
import org.unidata.mdm.system.type.pipeline.fragment.FragmentId;
import org.unidata.mdm.system.type.pipeline.fragment.InputFragment;

/**
 * @author Mikhail Mikhailov
 * Relation merge support accumulator.
 */
public class RelationMergeBatchSetAccumulator
    extends AbstractRelationBatchSetAccumulator<MergeRelationsRequestContext, MergeRelationsDTO>
    implements InputFragment<RelationMergeBatchSetAccumulator> {
    /**
     * This fragment ID.
     */
    public static final FragmentId<RelationMergeBatchSetAccumulator> ID
        = new FragmentId<>("RELATION_MERGE_BATCH_SET");
    /**
     * Collected rel. from etalons.
     */
    private final Map<Integer, List<RelationEtalonRemapFromPO>> etalonFromRemaps;
    /**
     * Collected rel. to etalons.
     */
    private final Map<Integer, List<RelationEtalonRemapToPO>> etalonToRemaps;
    /**
     * Collected origin remap (from one eid -> another eid) objects.
     */
    private final Map<Integer, List<RelationOriginRemapPO>> originRemaps;
    /**
     * Statistics.
     */
    private final RelationMergeBatchSetStatistics statistics;
    /**
     * Constructor.
     */
    public RelationMergeBatchSetAccumulator(int commitSize) {
        super(commitSize);
        etalonFromRemaps = new HashMap<>(StorageUtils.numberOfShards());
        etalonToRemaps = new HashMap<>(StorageUtils.numberOfShards());
        originRemaps = new HashMap<>(StorageUtils.numberOfShards());
        statistics = new RelationMergeBatchSetStatistics();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public FragmentId<RelationMergeBatchSetAccumulator> fragmentId() {
        return ID;
    }
    /**
     * {@inheritDoc}
     */
    @SuppressWarnings("unchecked")
    @Override
    public RelationMergeBatchSetStatistics statistics() {
        return statistics;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public String getStartTypeId() {
        return RelationsMergeStartExecutor.SEGMENT_ID;
    }
    /**
     * Adds a single rel etalon remap record update.
     * @param po the update
     */
    protected void accumulateEtalonRemapFrom(RelationEtalonRemapFromPO po) {
        if (Objects.nonNull(po)) {
            etalonFromRemaps.computeIfAbsent(po.getShard(), k -> new ArrayList<RelationEtalonRemapFromPO>(commitSize))
                .add(po);
        }
    }
    /**
     * Adds several rel etalon remap record updates. Needed for merge op.
     * @param pos the update
     */
    protected void accumulateEtalonsRemapFrom(List<RelationEtalonRemapFromPO> pos) {
        if (CollectionUtils.isNotEmpty(pos)) {
            for (int i = 0; i < pos.size(); i++) {
                accumulateEtalonRemapFrom(pos.get(i));
            }
        }
    }
    /**
     * Adds a single rel etalon remap record update.
     * @param po the update
     */
    protected void accumulateEtalonRemapTo(RelationEtalonRemapToPO po) {
        if (Objects.nonNull(po)) {
            etalonToRemaps.computeIfAbsent(po.getShard(), k -> new ArrayList<RelationEtalonRemapToPO>(commitSize))
                .add(po);
        }
    }
    /**
     * Adds several rel etalon remap record updates. Needed for merge op.
     * @param pos the update
     */
    protected void accumulateEtalonsRemapTo(List<RelationEtalonRemapToPO> pos) {
        if (CollectionUtils.isNotEmpty(pos)) {
            for (int i = 0; i < pos.size(); i++) {
                accumulateEtalonRemapTo(pos.get(i));
            }
        }
    }
    /**
     * Adds a single rel origin remap record update.
     * @param po the update
     */
    protected void accumulateOriginRemap(RelationOriginRemapPO po) {
        if (Objects.nonNull(po)) {
            originRemaps.computeIfAbsent(po.getShard(), k -> new ArrayList<RelationOriginRemapPO>(commitSize))
                .add(po);
        }
    }
    /**
     * Adds several rel origin remap record updates. Needed for merge op.
     * @param pos the update
     */
    protected void accumulateOriginsRemap(List<RelationOriginRemapPO> pos) {
        if (CollectionUtils.isNotEmpty(pos)) {
            for (int i = 0; i < pos.size(); i++) {
                accumulateOriginRemap(pos.get(i));
            }
        }
    }
    /**
     * @return the etalonRemapFromPOs
     */
    public Map<Integer, List<RelationEtalonRemapFromPO>> getEtalonFromRemaps() {
        return etalonFromRemaps;
    }
    /**
     * @return the etalonRemapToPOs
     */
    public Map<Integer, List<RelationEtalonRemapToPO>> getEtalonToRemaps() {
        return etalonToRemaps;
    }
    /**
     * @return the originRemaps
     */
    public Map<Integer, List<RelationOriginRemapPO>> getOriginRemaps() {
        return originRemaps;
    }

    /**
     * Accumulate artifacts.
     * @param ctx the context
     */
    @Override
    public void accumulate(MergeRelationsRequestContext ctx) {

        Stream.concat(ctx.getRelationsFrom().values().stream(), ctx.getRelationsTo().values().stream())
            .flatMap(Collection::stream)
            .forEach(iCtx -> {

                RelationMergeChangeSet batchSet = iCtx.changeSet();
                if (Objects.isNull(batchSet)) {
                    return;
                }

                accumulateEtalonsRemapFrom(batchSet.getEtalonFromRemaps());
                accumulateEtalonsRemapTo(batchSet.getEtalonToRemaps());
                accumulateOriginsRemap(batchSet.getOriginRemaps());
                accumulateEtalonUpdates(batchSet.getEtalonUpdates());
                accumulateWipeExternalKeys(batchSet.getExternalKeyWipes());
                accumulateInsertExternalKeys(batchSet.getExternalKeyInserts());

                if (CollectionUtils.isNotEmpty(batchSet.getIndexRequestContexts())) {
                    indexUpdates.addAll(batchSet.getIndexRequestContexts());
                }
            });
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public BatchIterator<MergeRelationsRequestContext> iterator() {
        return new RelationMergeBatchIterator();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void discharge() {
        super.discharge();
        etalonFromRemaps.values().forEach(Collection::clear);
        etalonToRemaps.values().forEach(Collection::clear);
        originRemaps.values().forEach(Collection::clear);
        statistics.reset();
    }

    /**
     * @author Mikhail Mikhailov
     * Simple batch iterator.
     */
    private class RelationMergeBatchIterator implements BatchIterator<MergeRelationsRequestContext> {
        /**
         * List iterator.
         */
        private ListIterator<MergeRelationsRequestContext> i = workingCopy.listIterator();
        /**
         * Current entry.
         */
        private MergeRelationsRequestContext current = null;
        /**
         * Constructor.
         */
        public RelationMergeBatchIterator() {
            super();
        }
        /**
         * If there are more elements to iterate.
         * @return true, if so, false otherwise
         */
        @Override
        public boolean hasNext() {

            boolean hasNext = i.hasNext();
            if (!hasNext && current != null) {
                accumulate(current);
            }

            return hasNext;
        }
        /**
         * Next context for origin upsert
         * @return next context
         */
        @Override
        public MergeRelationsRequestContext next() {

            MergeRelationsRequestContext next = i.next();
            if (current != null) {
                accumulate(current);
            }

            init(next);

            current = next;
            return next;
        }
        /**
         * Removes current element.
         */
        @Override
        public void remove() {
            i.remove();
            current = null;
        }
        /**
         * Does some preprocessing.
         * @param ctx the upsert context
         */
        private void init(MergeRelationsRequestContext ctx) {

            for (Entry<String, List<MergeFromRelationRequestContext>> entry : ctx.getRelationsFrom().entrySet()) {
                for (MergeFromRelationRequestContext dCtx : entry.getValue()) {

                    if (Objects.nonNull(dCtx.changeSet())) {
                        continue;
                    }

                    dCtx.changeSet(new RelationMergeBatchSet());
                    dCtx.setFlag(DataContextFlags.FLAG_BATCH_OPERATION, true);
                }
            }

            for (Entry<String, List<MergeToRelationRequestContext>> entry : ctx.getRelationsTo().entrySet()) {
                for (MergeToRelationRequestContext dCtx : entry.getValue()) {

                    if (Objects.nonNull(dCtx.changeSet())) {
                        continue;
                    }

                    dCtx.changeSet(new RelationMergeBatchSet());
                    dCtx.setFlag(DataContextFlags.FLAG_BATCH_OPERATION, true);
                }
            }

            ctx.setFlag(DataContextFlags.FLAG_BATCH_OPERATION, true);
        }
    }
}
